package org.eclipse.moquette.spi.impl;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import org.HdrHistogram.Histogram;
import org.eclipse.moquette.proto.messages.AbstractMessage;
import org.eclipse.moquette.server.ServerChannel;
import org.eclipse.moquette.spi.IMessagesStore;
import org.eclipse.moquette.spi.IMessaging;
import org.eclipse.moquette.spi.ISessionsStore;
import org.eclipse.moquette.spi.impl.events.LostConnectionEvent;
import org.eclipse.moquette.spi.impl.events.MessagingEvent;
import org.eclipse.moquette.spi.impl.events.ProtocolEvent;
import org.eclipse.moquette.spi.impl.events.StopEvent;
import org.eclipse.moquette.spi.impl.security.*;
import org.eclipse.moquette.spi.impl.subscriptions.SubscriptionsStore;
import org.eclipse.moquette.spi.persistence.MapDBPersistentStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import static org.eclipse.moquette.commons.Constants.PASSWORD_FILE_PROPERTY_NAME;
import static org.eclipse.moquette.commons.Constants.PERSISTENT_STORE_PROPERTY_NAME;
import static org.eclipse.moquette.commons.Constants.ALLOW_ANONYMOUS_PROPERTY_NAME;
import static org.eclipse.moquette.commons.Constants.ACL_FILE_PROPERTY_NAME;

/**
 * Created by alex on 15-5-30.
 */
public class SimpleMessaging implements IMessaging, EventHandler<ValueEvent> {

    private static final Logger LOG = LoggerFactory.getLogger(SimpleMessaging.class);

    private SubscriptionsStore subscriptions;

    private RingBuffer<ValueEvent> m_ringBuffer;

    private IMessagesStore m_storageService;
    private ISessionsStore m_sessionsStore;

    private ExecutorService m_executor;
    private Disruptor<ValueEvent> m_disruptor;

    private static SimpleMessaging INSTANCE;

    private final ProtocolProcessor m_processor = new ProtocolProcessor();
    private final AnnotationSupport annotationSupport = new AnnotationSupport();
    private boolean benchmarkEnabled = false;

    CountDownLatch m_stopLatch;

    Histogram histogram = new Histogram(5);

    private SimpleMessaging() {

    }

    /**
     * 获得SimpleMessaging 的单例
     * @return
     */
    public static SimpleMessaging getInstance() {
        if (INSTANCE == null) {
            INSTANCE = new SimpleMessaging();
        }
        return INSTANCE;
    }

    /**
     * 初始化入站事件的处理器
     * @param configProps
     */
    public void init(Properties configProps) {
        subscriptions = new SubscriptionsStore();
        m_executor = Executors.newFixedThreadPool(1);
        m_disruptor = new Disruptor<ValueEvent>(ValueEvent.EVENT_FACTORY, 1024 * 32, m_executor);

       /*Disruptor<ValueEvent> m_disruptor = new Disruptor<ValueEvent>(ValueEvent.EVENT_FACTORY, 1024 * 32, m_executor,
                ProducerType.MULTI, new BusySpinWaitStrategy());*/
        m_disruptor.handleEventsWith(this);
        m_disruptor.start();

        // Get the ring buffer from the Disruptor to be used for publishing.
        m_ringBuffer = m_disruptor.getRingBuffer();

        annotationSupport.processAnnotations(m_processor);//通过反射找到使用@MQTTMessage 注解的方法

        processInit(configProps);
//        disruptorPublish(new InitEvent(configProps));


    }

    /**
     * 生产事件 --> ringBuffer
     * @param msgEvent
     */
    private void disruptorPublish(MessagingEvent msgEvent) {
        LOG.debug("disruptorPublish publishing event {}", msgEvent);
        long sequence = m_ringBuffer.next();
        ValueEvent event = m_ringBuffer.get(sequence); //ValueEvent 封装我们的session 和 message数据

        event.setEvent(msgEvent);

        m_ringBuffer.publish(sequence);
    }

    /**
     * ringBuffer -----> 消费事件
     * @param valueEvent
     * @param l
     * @param b
     * @throws Exception
     */
    @Override
    public void onEvent(ValueEvent valueEvent, long l, boolean b) throws Exception {

        MessagingEvent evt = valueEvent.getEvent();
        LOG.info("onEvent processing messaging event from input ringbuffer {}", evt);
        if (evt instanceof StopEvent) {
            processStop();
            return;
        }

        /**
         * 连接断开事件
         */
        if (evt instanceof LostConnectionEvent) {
            LostConnectionEvent lostEvt = (LostConnectionEvent) evt;
            m_processor.processConnectionLost(lostEvt);
            return;
        }

        /**
         * 协议事件
         */
        if (evt instanceof ProtocolEvent) {
            ServerChannel session = ((ProtocolEvent) evt).getSession();
            AbstractMessage message = ((ProtocolEvent) evt).getMessage();

            try {
                long startTime = System.nanoTime();
                annotationSupport.dispatch(session, message); //根据messageType 指定处理的方法
                if (benchmarkEnabled) {
                    long delay = System.nanoTime() - startTime;
                    histogram.recordValue(delay);//记录处理事件
                }
            } catch (Throwable th) {
                LOG.error("Serious error processing the message {} for {}", message, session, th);
            }
        }

    }

    @Override
    public void stop() {
        m_stopLatch = new CountDownLatch(1);
        disruptorPublish(new StopEvent());
        try {
            //wait the callback notification from the protocol processor thread
            LOG.debug("waiting 10 sec to m_stopLatch");
            boolean elapsed = !m_stopLatch.await(10, TimeUnit.SECONDS);
            LOG.debug("after m_stopLatch");
            m_executor.shutdown();
            m_disruptor.shutdown();
            if (elapsed) {
                LOG.error("Can't stop the server in 10 seconds");
            }
        } catch (InterruptedException ex) {
            LOG.error(null, ex);
        }
    }

    @Override
    public void lostConnection(ServerChannel session, String clientID) {
        disruptorPublish(new LostConnectionEvent(session, clientID));
    }

    @Override
    public void handleProtocolMessage(ServerChannel session, AbstractMessage msg) {
        disruptorPublish(new ProtocolEvent(session, msg));
    }

    private void processInit(Properties props) {
        benchmarkEnabled = Boolean.parseBoolean(System.getProperty("moquette.processor.benchmark", "false"));

        //TODO use a property to select the storage path
        MapDBPersistentStore mapStorage = new MapDBPersistentStore(props.getProperty(PERSISTENT_STORE_PROPERTY_NAME, ""));
        m_storageService = mapStorage;
        m_sessionsStore = mapStorage;

        m_storageService.initStore();

        //List<Subscription> storedSubscriptions = m_sessionsStore.listAllSubscriptions();
        //subscriptions.init(storedSubscriptions);
        subscriptions.init(m_sessionsStore);

        String passwdPath = props.getProperty(PASSWORD_FILE_PROPERTY_NAME, "");
        String configPath = System.getProperty("moquette.path", null);
        IAuthenticator authenticator;
        if (passwdPath.isEmpty()) {
            authenticator = new AcceptAllAuthenticator();
        } else {
            authenticator = new FileAuthenticator(configPath, passwdPath);
        }

        String aclFilePath = props.getProperty(ACL_FILE_PROPERTY_NAME, "");
        IAuthorizator authorizator;
        if (aclFilePath != null && !aclFilePath.isEmpty()) {
            authorizator = new ACLAuthorizator(aclFilePath);
        } else {
            authorizator = new PermitAllAuthorizator();
        }

        boolean allowAnonymous = Boolean.parseBoolean(props.getProperty(ALLOW_ANONYMOUS_PROPERTY_NAME, "true"));
        m_processor.init(subscriptions, m_storageService, m_sessionsStore, authenticator, allowAnonymous, authorizator);
    }

    private void processStop() {
        LOG.debug("processStop invoked");
        m_storageService.close();
        LOG.debug("subscription tree {}", subscriptions.dumpTree());
//        m_eventProcessor.halt();
//        m_executor.shutdown();

        subscriptions = null;
        m_stopLatch.countDown();

        if (benchmarkEnabled) {
            //log metrics
            histogram.outputPercentileDistribution(System.out, 1000.0);
        }
    }
}
